---
title: Implementing DASE
---

This section gives you an overview of DASE components and how to implement them. You will find links to some engine templates for more concrete examples.

# DataSource

DataSource reads and selects useful data from the Event Store (data store of the Event Server) and returns TrainingData.

## readTraining()

You need to implment readTraining() of [PDataSource](https://docs.prediction.io/api/current/#org.apache.predictionio.controller.PDataSource), where you can use the [PEventStore Engine API](https://docs.prediction.io/api/current/#org.apache.predictionio.data.store.PEventStore$) to read the events and create the TrainingData based on the events.

The following code example reads user "view" and "buy" item events, filters specific type of events for future processing and returns TrainingData accordingly.

```scala
class DataSource(val dsp: DataSourceParams)
  extends PDataSource[TrainingData,
      EmptyEvaluationInfo, Query, EmptyActualResult] {

  @transient lazy val logger = Logger[this.type]

  override
  def readTraining(sc: SparkContext): TrainingData = {

    val eventsRDD: RDD[Event] = PEventStore.find(
      appName = dsp.appName,
      entityType = Some("user"),
      eventNames = Some(List("view", "buy")),
      // targetEntityType is optional field of an event.
      targetEntityType = Some(Some("item")))(sc)
      .cache()

    val viewEventsRDD: RDD[ViewEvent] = eventsRDD
      .filter { event => event.event == "view" }
      .map { ... }

    ...

    new TrainingData(...)
  }

}
```


## Using PEventStore Engine API

Please see [Event Server Overview](https://docs.prediction.io/datacollection/) to understand [EventAPI](https://docs.prediction.io/datacollection/eventapi/) and [event modeling](https://docs.prediction.io/datacollection/eventmodel/).

With [PEventStore Engine API](https://docs.prediction.io/api/current/#org.apache.predictionio.data.store.PEventStore$), you can easily read different events in DataSource and get the information you need.

For example, let's say you have events like the following:

```json
{
  "event": "myEvent",
  "entityType": "user",
  "entityId": "u0",
  "targetEntityType": "item",
  "targetEntityId": "i0",
  "properties" : {
    "a" : 3,
    "b" : "some_string",
    "c" : ["a", "b", "c"],
    "d" : [1.2, 3.4, 5.6],
    "e" : 6
  }
}
```

Then following code could read these events and extract the properties field of the event and convert it to a `MyEvent` object.

```scala
  val myEvents: RDD[MyEvent] = PEventStore.find(
      appName = dsp.appName,
      entityType = Some("user"),
      eventNames = Some(List("myEvent")),
      // targetEntityType is optional field of an event.
      targetEntityType = Some(Some("item")))(sc)
    .map { event =>
      try {
        MyEvent(
          entityId = event.entityId,
          targetEntityId = event.targetEntityId.get,
          a = event.properties.get[Int]("a"),
          b = event.properties.get[String]("b"),
          c = event.properties.get[List[String]]("c"),
          d = event.properties.get[List[Double]]("d"),
          e = event.properties.getOpt[Int]("e") // use getOpt for optional data
        )
      } catch {
        case e: Exception =>
          logger.error(s"Cannot convert ${event}. Exception: ${e}.")
          throw e
      }
    }
```

If you have used special events `$set/$unset/$delete` setting entity's properties, you can retrieve it with `PEventStore.aggregateProperties()`.

Please see [event modeling](https://docs.prediction.io/datacollection/eventmodel/) to understand usage of special `$set/$unset/$delete` events.

For example, the following code show how you could retrieve properties of the "item" entities:

```scala
    // create a RDD of (entityID, Item)
    val itemsRDD: RDD[(String, Item)] = PEventStore.aggregateProperties(
      appName = dsp.appName,
      entityType = "item"
    )(sc).map { case (entityId, properties) =>

      try {
        val item = Item(
          a = preopties.get[Int]("a"),
          b = properties.get[String]("b"),
          c = properties.get[List[String]]("c"),
          d = properties.get[List[Double]]("d"),
          e = properties.getOpt[Int]("e") // use getOpt for optional data
        )

        (entityId, item)
      } catch {
        case e: Exception =>
          logger.error(s"Failed to get properties ${properties} of ${entityId}. Exception: ${e}.")
          throw e
      }

    }
```

Example:

- [DataSource of Similar Product Template](/templates/similarproduct/dase/#data)

# Preparator

Preparator is responsible for pre-processing `TrainingData` for any necessary feature selection and data processing tasks and generate `PreparedData` which contains the data the Algorithm needs.

A few example usages of Preparator:

- Feature extraction
- Common pre-processing logic if you have multiple algorithms
- For simple cases, the Preparator may simply pass the same `TrainingData` as `PreparedData` for Algorithm.

## prepare()

You need to implement the `prepare()` method of [PPrepartor](https://docs.prediction.io/api/current/#org.apache.predictionio.controller.PPreparator) to perform such tasks.

Example:

- [Preparator of Leading Scoring Template](/templates/leadscoring/dase/#data): it pre-processes the TrainingData and generate the feature vectors needed for the algorithm.
- [Preparator of Similar Product Template](/templates/similarproduct/dase/#data): it simply passes the TrainingData as PreparedData for the algorithm.

# Algorithm

The two methods of the Algorithm class are train() and predict():

## train()

train() is responsible for training a predictive model. It is called when you
run `pio train`. Apache PredictionIO (incubating) will store this model.

## predict()

predict() is responsible for using this model to make prediction. It is called when you send a JSON query to the engine. Note that predict() is called in real time.

Apache PredictionIO (incubating) supports two types of algorithms:

- **[P2LAlgorithm](https://docs.prediction.io/api/current/#org.apache.predictionio.controller.P2LAlgorithm)**: trains a Model which does not contain RDD
- **[PAlgorithm](https://docs.prediction.io/api/current/#org.apache.predictionio.controller.PAlgorithm)**: trains a Model which contains RDD

## P2LAlgorithm

For `P2LAlgorithm`, the Model is automatically serialized and persisted by
Apache PredictionIO (incubating) after training.

Implementing `IPersistentModel` and `IPersistentModelLoader` is optional for P2LAlgorithm.

Example:

- [Algorithm of Similar Product Template](/templates/similarproduct/dase/#algorithm)

## PAlgorithm

`PAlgorithm` should be used when your Model contains RDD. The model produced by `PAlgorithm` is not persisted by default. To persist the model, you need to do the following:

- The Model class should extend the `IPersistentModel` trait and implement the `save()` method for saving the model. The trait `IPersistentModel` requires a type parameter which is the class type of algorithm parameter.
- Implement a Model factory object which extends the `IPersistentModelLoader` trait and implement the `apply()` for loading the model. The trait `IPersistentModelLoader` requires two type parameters which are the types of algorithm parameter and the model produced by the algorithm.

Example:

- [Algorithm of Recommendation Template](/templates/recommendation/dase/#algorithm): it implements PAlgorithm and the IPersistentModel and IPersistentModelLoader.
- [Algorithm of Vanilla Template](/templates/vanilla/dase): it walks through example of P2LAlgorithm and PAlgorithm.

## using LEventStore Engine API in predict()

You may use [LEventStore.findByEntity()](https://docs.prediction.io/api/current/#org.apache.predictionio.data.store.LEventStore$) to retrieve events of a specific entity. For example, retrieve recent events of the user specified in the query) and use these recent events to make prediction in real time.


For example, the following code reads the recent 10 view events of `query.user`:

```scala
    val recentEvents = try {
      LEventStore.findByEntity(
        appName = ap.appName,
        // entityType and entityId is specified for fast lookup
        entityType = "user",
        entityId = query.user,
        eventNames = Some(List("view")),
        targetEntityType = Some(Some("item")),
        limit = Some(10),
        latest = true,
        // set time limit to avoid super long DB access
        timeout = Duration(200, "millis")
      )
    } catch {
      case e: scala.concurrent.TimeoutException =>
        logger.error(s"Timeout when read recent events." +
          s" Empty list is used. ${e}")
        Iterator[Event]()
      case e: Exception =>
        logger.error(s"Error when read recent events: ${e}")
        throw e
    }
```


Example:

- [Algorithm of E-Commerce Recommendation template](/templates/ecommercerecommendation/dase#algorithm): LEventStore.findByEntity() is used to retrieve all items seen by the user and filter them from recommendation in predict().


# Serving

## serve()

You need to implement the serve() method of the class [LServing](https://docs.prediction.io/api/current/#org.apache.predictionio.controller.LServing). The serve() method processes predicted result. It is also responsible for combining multiple predicted results into one if you have more than one predictive model.

Example:

- [Serving of Similar Product Template](/templates/similarproduct/dase/#serving): It simply returns the predicted result
- [Serving of multi-algorithm examples of Similar Product Template](/templates/similarproduct/multi-events-multi-algos/): It combines the result of multiple algorithms and return
